热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

块-将输入发送到python子进程管道。-blocks-sendinputtopythonsubprocesspipeline

Imtestingsubprocessespipelineswithpython.ImawarethatIcandowhattheprogramsbelowdoi

I'm testing subprocesses pipelines with python. I'm aware that I can do what the programs below do in python directly, but that's not the point. I just want to test the pipeline so I know how to use it.

我正在用python测试子进程管道。我知道我可以直接用python做下面的程序,但这不是重点。我只是想测试一下管道,所以我知道如何使用它。

My system is Linux Ubuntu 9.04 with default python 2.6.

我的系统是Linux Ubuntu 9.04和默认的python 2.6。

I started with this documentation example.

我从这个文档示例开始。

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

That works, but since p1's stdin is not being redirected, I have to type stuff in the terminal to feed the pipe. When I type ^D closing stdin, I get the output I want.

这是可行的,但是由于p1的stdin没有被重定向,所以我必须在终端输入一些东西以满足管道的需要。当我键入D关闭stdin时,我得到我想要的输出。

However, I want to send data to the pipe using a python string variable. First I tried writing on stdin:

但是,我希望使用python字符串变量将数据发送到管道。首先,我试着在stdin上写:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

Didn't work. I tried using p2.stdout.read() instead on last line, but it also blocks. I added p1.stdin.flush() and p1.stdin.close() but it didn't work either. I Then I moved to communicate:

没有工作。我尝试使用p2.stdout.read()而不是最后一行,但它也会阻塞。我添加了p1.stdin.flush()和p1.stdin.close(),但它也不起作用。然后我开始沟通:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

So that's still not it.

所以这还不是它。

I noticed that running a single process (like p1 above, removing p2) works perfectly. And passing a file handle to p1 (stdin=open(...)) also works. So the problem is:

我注意到,运行单个进程(如上面的p1,删除p2)工作得很好。将文件句柄传递给p1 (stdin=open(…))也可以工作。所以问题是:

Is it possible to pass data to a pipeline of 2 or more subprocesses in python, without blocking? Why not?

是否有可能将数据传递给python中2个或更多的子进程,而不阻塞?为什么不呢?

I'm aware I could run a shell and run the pipeline in the shell, but that's not what I want.

我知道我可以运行一个shell并在shell中运行管道,但这不是我想要的。


UPDATE 1: Following Aaron Digulla's hint below I'm now trying to use threads to make it work.

更新1:根据Aaron Digulla的提示,我现在正在尝试使用线程来实现它。

First I've tried running p1.communicate on a thread.

首先,我尝试运行p1.在线程上进行通信。

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

Okay, didn't work. Tried other combinations like changing it to .write() and also p2.read(). Nothing. Now let's try the opposite approach:

好了,没有工作。尝试其他组合,如将其更改为.write()和p2.read()。什么都没有。现在让我们尝试相反的方法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

code ends up blocking somewhere. Either in the spawned thread, or in the main thread, or both. So it didn't work. If you know how to make it work it would make easier if you can provide working code. I'm trying here.

代码最终会在某处阻塞。无论是在派生的线程中,还是在主线程中,或者两者都有。所以它没有工作。如果您知道如何使其工作,那么如果您能够提供工作代码,将会更容易。我在这里。


UPDATE 2

更新2

Paul Du Bois answered below with some information, so I did more tests. I've read entire subprocess.py module and got how it works. So I tried applying exactly that to code.

保罗·杜·波依斯在下面回答了一些信息,所以我做了更多的测试。我读过整个子流程。py模块,得到了它的工作原理。所以我试着把它精确地应用到代码中。

I'm on linux, but since I was testing with threads, my first approach was to replicate the exact windows threading code seen on subprocess.py's communicate() method, but for two processes instead of one. Here's the entire listing of what I tried:

我在linux上,但是因为我是用线程测试的,所以我的第一个方法是复制子过程中看到的准确的windows线程代码。py的通信()方法,但对于两个进程而不是一个。以下是我所尝试的全部清单:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

Well. It didn't work. Even after p1.stdin.close() was called, p2.stdout.read() still blocks.

好。它没有工作。即使在p1.stdin.close()被调用之后,p2.stdout.read()仍然阻塞。

Then I tried the posix code on subprocess.py:

然后我在子过程中尝试了posix代码。

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Also blocks on select.select(). By spreading prints around, I found out this:

也块select.select()。通过四处传播,我发现:

  • Reading is working. Code reads many times during execution.
  • 阅读是有效的。代码在执行过程中多次读取。
  • Writing is also working. Data is written to p1.stdin.
  • 写作也是工作。数据写入p1.stdin。
  • At the end of numwrites, p1.stdin.close() is called.
  • 在numwrite的末尾,p1.stdin.close()被调用。
  • When select() starts blocking, only to_read has something, p2.stdout. to_write is already empty.
  • 当select()开始阻塞时,只有to_read有东西,p2.stdout。to_write已经空了。
  • os.read() call always returns something, so p2.stdout.close() is never called.
  • read()调用总是返回一些东西,所以p2.stdout.close()从不调用。

Conclusion from both tests: Closing the stdin of the first process on the pipeline (grep in the example) is not making it dump its buffered output to the next and die.

这两种测试的结论是:关闭管道上第一个进程的stdin(例如grep),并不是将它的缓冲输出转储到下一个,然后死亡。

No way to make it work?

没有办法让它工作?

PS: I don't want to use a temporary file, I've already tested with files and I know it works. And I don't want to use windows.

我不想使用临时文件,我已经用文件测试过了,我知道它有效。我不想使用windows。

11 个解决方案

#1


21  

I found out how to do it.

我知道怎么做了。

It is not about threads, and not about select().

它不是关于线程的,而不是关于select()的。

When I run the first process (grep), it creates two low-level file descriptors, one for each pipe. Lets call those a and b.

当我运行第一个进程(grep)时,它将创建两个低级的文件描述符,一个用于每个管道。我们称之为a和b。

When I run the second process, b gets passed to cut sdtin. But there is a brain-dead default on Popen - close_fds=False.

当我运行第二个进程时,b被传递给了sdtin。但是,在Popen - close_fds=False中存在一个脑死亡的默认值。

The effect of that is that cut also inherits a. So grep can't die even if I close a, because stdin is still open on cut's process (cut ignores it).

它的作用是,切割也继承了a,所以即使我关闭a, grep也不会死,因为stdin在cut的过程中仍然是开放的(cut忽略它)。

The following code now runs perfectly.

下面的代码现在运行得很好。

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.

在unix系统中,close_fds=True应该是默认值。在windows上,它关闭所有的fds,因此它可以防止管道。

EDIT:

编辑:

PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select() to know when to read/write. The code in the question should give a hint on how to implement that.

对于有类似问题的人来说,阅读这个答案:正如pooryorick在评论中所说的那样,如果数据写到p1,那也会阻塞。stdin比缓冲区大。在这种情况下,你应该把数据分成小块,并使用select.select()来知道什么时候读/写。问题中的代码应该给出如何实现这一点的提示。

EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True and close ALL fds, one could close the fds that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn function from Popen comes very handy to do just that. On executing p2 you can do:

EDIT2:找到了另一个解决方案,从pooryorick那里得到了更多的帮助——而不是使用close_fds=True并关闭所有fds,一个可以关闭属于第一个进程的fds,当执行第二个进程时,它将工作。关闭必须在child中完成,所以Popen的preexec_fn函数非常方便。在执行p2时,你可以:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

#2


6  

Working with large files

Two principles need to be applied uniformly when working with large files in Python.

在使用Python中的大文件时,需要一致地应用两个原则。

  1. Since any IO routine can block, we must keep each stage of the pipeline in a different thread or process. We use threads in this example, but subprocesses would let you avoid the GIL.
  2. 由于任何IO例程都可以阻塞,我们必须在不同的线程或进程中保持管道的每个阶段。我们在这个示例中使用线程,但是子进程会让您避免GIL。
  3. We must use incremental reads and writes so that we don't wait for EOF before starting to make progress.
  4. 我们必须使用增量读和写,这样我们才不会等待EOF才开始取得进展。

An alternative is to use nonblocking IO, though this is cumbersome in standard Python. See gevent for a lightweight threading library that implements the synchronous IO API using nonblocking primitives.

另一种方法是使用非阻塞IO,尽管这在标准Python中很麻烦。请参阅gevent,以实现使用非阻塞原语实现同步IO API的轻量级线程库。

Example code

We'll construct a silly pipeline that is roughly

我们将构建一个简单的管道。

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

where each stage in braces {} is implemented in Python while the others use standard external programs. TL;DR: See this gist.

在大括号{}中的每个阶段都是在Python中实现的,而其他阶段则使用标准的外部程序。TL;博士:看到这个要点。

We start with the expected imports.

我们从预期的进口开始。

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

Python stages of the pipeline

All but the last Python-implemented stage of the pipeline needs to go in a thread so that it's IO does not block the others. These could instead run in Python subprocesses if you wanted them to actually run in parallel (avoid the GIL).

除了最后一个python实现阶段之外,所有的线程都需要进入一个线程,这样它的IO就不会阻塞其他线程。如果您希望它们实际上并行运行(避免GIL),那么它们可以在Python子进程中运行。

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

Each of these needs to be put in its own thread, which we'll do using this convenience function.

每一个都需要放到它自己的线程中,我们将使用这个便利函数。

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

Create the pipeline

Create the external stages using Popen and the Python stages using spawn. The argument bufsize=-1 says to use the system default buffering (usually 4 kiB). This is generally faster than the default (unbuffered) or line buffering, but you'll want line buffering if you want to visually monitor the output without lags.

使用Popen和使用衍生工具的Python阶段创建外部阶段。参数bufsize=-1表示使用系统默认缓冲(通常为4 kiB)。这通常比默认的(未缓冲的)或行缓冲要快,但是如果您想在不滞后的情况下直观地监视输出,您将需要行缓冲。

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Drive the pipeline

Assembled as above, all the buffers in the pipeline will fill up, but since nobody is reading from the end (grepz.stdout), they will all block. We could read the entire thing in one call to grepz.stdout.read(), but that would use a lot of memory for large files. Instead, we read incrementally.

如上所述,管道中的所有缓冲区都将被填满,但由于没有人从末尾读取(grepz.stdout),它们将全部阻塞。我们可以在一个调用grepz.stdout.read()中读取整个事件,但是这会占用大量内存用于大型文件。相反,我们读增量。

for line in grepz.stdout:
    sys.stdout.write(line.lower())

The threads and processes clean up once they reach EOF. We can explicitly clean up using

线程和进程一旦到达EOF就会清除。我们可以明确地使用。

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 and earlier

Internally, subprocess.Popen calls fork, configures the pipe file descriptors, and calls exec. The child process from fork has copies of all file descriptors in the parent process, and both copies will need to be closed before the corresponding reader will get EOF. This can be fixed by manually closing the pipes (either by close_fds=True or a suitable preexec_fn argument to subprocess.Popen) or by setting the FD_CLOEXEC flag to have exec automatically close the file descriptor. This flag is set automatically in Python-2.7 and later, see issue12786. We can get the Python-2.7 behavior in earlier versions of Python by calling

在内部,子流程。Popen调用fork,配置管道文件描述符,并调用exec。来自fork的子进程在父进程中具有所有文件描述符的副本,并且在相应的阅读器将得到EOF之前,两个副本都需要被关闭。这可以通过手动关闭管道(通过close_fds=True或一个合适的preexec_fn参数到subprocess.Popen)来修复,或者通过设置FD_CLOEXEC标志,使exec自动关闭文件描述符。此标志自动设置在Python-2.7中,稍后将看到问题12786。我们可以通过调用来获得Python早期版本中的Python-2.7行为。

p._set_cloexec_flags(p.stdin)

before passing p.stdin as an argument to a subsequent subprocess.Popen.

之前通过p。将stdin作为后续子进程的参数。

#3


3  

There are three main tricks to making pipes work as expected

有三种方法可以让管道按照预期工作。

  1. Make sure each end of the pipe is used in a different thread/process (some of the examples near the top suffer from this problem).

    确保管道的每一端都在不同的线程/进程中使用(在顶部附近的一些示例会受到这个问题的影响)。

  2. explicitly close the unused end of the pipe in each process

    在每个进程中显式关闭管道未使用的端口。

  3. deal with buffering by either disabling it (Python -u option), using pty's, or simply filling up the buffer with something that won't affect the data, ( maybe '\n', but whatever fits).

    通过禁用它(Python -u选项),使用pty,或者简单地用一些不会影响数据的东西填充缓冲区(可能是“\n”,但无论如何)。

The examples in the Python "pipeline" module (I'm the author) fit your scenario exactly, and make the low-level steps fairly clear.

Python“管道”模块中的示例(我是作者)完全适合您的场景,并使底层步骤相当清晰。

http://pypi.python.org/pypi/pipeline/

http://pypi.python.org/pypi/pipeline/

More recently, I used the subprocess module as part of a producer-processor-consumer-controller pattern:

最近,我使用子流程模块作为生产者-处理器-消费者-控制器模式的一部分:

http://www.darkarchive.org/w/Pub/PythonInteract

http://www.darkarchive.org/w/Pub/PythonInteract

This example deals with buffered stdin without resorting to using a pty, and also illustrates which pipe ends should be closed where. I prefer processes to threading, but the principle is the same. Additionally, it illustrates synchronizing Queues to which feed the producer and collect output from the consumer, and how to shut them down cleanly (look out for the sentinels inserted into the queues). This pattern allows new input to be generated based on recent output, allowing for recursive discovery and processing.

这个示例处理的是缓冲的stdin,而不需要使用pty,还演示了应该在何处关闭管道端口。我更喜欢进程而不是线程,但原理是一样的。此外,它还演示了同步队列,以向生产者提供提要,并从消费者那里收集输出,以及如何干净地关闭它们(注意插入到队列中的哨兵)。此模式允许根据最近的输出生成新的输入,允许进行递归发现和处理。

#4


3  

Nosklo's offered solution will quickly break if too much data is written to the receiving end of the pipe:

如果将过多的数据写入到管道的接收端,Nosklo提供的解决方案将会很快中断:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

If this script doesn't hang on your machine, just increase "20000" to something that exceeds the size of your operating system's pipe buffers.

如果此脚本不挂在您的机器上,只需将“20000”增加到超过操作系统的管道缓冲区的大小。

This is because the operating system is buffering the input to "grep", but once that buffer is full, the p1.stdin.write call will block until something reads from p2.stdout. In toy scenarios, you can get way with writing to/reading from a pipe in the same process, but in normal usage, it is necessary to write from one thread/process and read from a separate thread/process. This is true for subprocess.popen, os.pipe, os.popen*, etc.

这是因为操作系统正在将输入缓冲到“grep”,但是一旦缓冲区满了,p1.stdin就会出现。写调用将阻塞,直到从p2.stdout读取。在toy场景中,您可以通过在同一个进程中从管道中写入/读取的方式获得方法,但是在正常使用情况下,必须从一个线程/进程写入,并从一个单独的线程/进程中读取。这对于子流程来说是正确的。popen,操作系统。管、操作系统。popen *等。

Another twist is that sometimes you want to keep feeding the pipe with items generated from earlier output of the same pipe. The solution is to make both the pipe feeder and the pipe reader asynchronous to the man program, and implement two queues: one between the main program and the pipe feeder and one between the main program and the pipe reader. PythonInteract is an example of that.

另一个难题是,有时您需要继续为管道提供与以前相同管道的输出所生成的项。解决方案是让管道馈线和管道阅读器异步到man程序,并实现两个队列:一个在主程序和管道馈线之间,另一个在主程序和管道阅读器之间。python互动就是一个例子。

Subprocess is a nice convenience model, but because it hides the details of the os.popen and os.fork calls it does under the hood, it can sometimes be more difficult to deal with than the lower-level calls it utilizes. For this reason, subprocess is not a good way to learn about how inter-process pipes really work.

子过程是一个很好的便利模型,但是因为它隐藏了操作系统的细节。popen和操作系统。fork调用它在引擎盖下做,它有时比它使用的低级调用更难处理。因此,子过程不是了解进程间管道真正工作的好方法。

#5


2  

You must do this in several threads. Otherwise, you'll end up in a situation where you can't send data: child p1 won't read your input since p2 doesn't read p1's output because you don't read p2's output.

您必须在几个线程中执行此操作。否则,你就会陷入无法发送数据的情况:p1不会读取你的输入,因为p2不会读取p1的输出,因为你不会读取p2的输出。

So you need a background thread that reads what p2 writes out. That will allow p2 to continue after writing some data to the pipe, so it can read the next line of input from p1 which again allows p1 to process the data which you send to it.

你需要一个后台线程来读取p2写出来的内容。这将允许p2在写入一些数据到管道之后继续,这样它就可以读取p1的下一行,这再次允许p1处理你发送给它的数据。

Alternatively, you can send the data to p1 with a background thread and read the output from p2 in the main thread. But either side must be a thread.

或者,您可以使用后台线程将数据发送到p1,并在主线程中读取p2的输出。但任何一方都必须是一条线。

#6


2  

Responding to nosklo's assertion (see other comments to this question) that it can't be done without close_fds=True:

响应nosklo的断言(参见其他对这个问题的评论),如果没有close_fds=True:

close_fds=True is only necessary if you've left other file descriptors open. When opening multiple child processes, it's always good to keep track of open files that might get inherited, and to explicitly close any that aren't needed:

如果您已经打开了其他的文件描述符,那么close_fds=True是必需的。在打开多个子进程时,最好保持对可能被继承的打开文件的跟踪,并明确关闭不需要的任何文件:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds defaults to False because subprocess prefers to trust the calling program to know what it's doing with open file descriptors, and just provide the caller with an easy option to close them all if that's what it wants to do.

close_fds默认为False,因为subprocess更喜欢信任调用程序,以了解它使用打开的文件描述符做什么,并且只向调用者提供一个简单的选项来关闭它们,如果这是它想要做的事情。

But the real issue is that pipe buffers will bite you for all but toy examples. As I have said in my other answers to this question, the rule of thumb is to not have your reader and your writer open in the same process/thread. Anyone who wants to use the subprocess module for two-way communication would be well-served to study os.pipe and os.fork, first. They're actually not that hard to use if you have a good example to look at.

但真正的问题是,管道缓冲区会咬你,除了玩具例子。正如我在回答这个问题的其他回答中所说的,经验法则是不要让你的读者和你的作者在同一个进程/线程中打开。任何想要使用子过程模块进行双向通信的人都可以很好地学习操作系统。管道和操作系统。叉,放在第一位。如果你有一个很好的例子,它们其实并不难用。

#7


1  

I think you may be examining the wrong problem. Certainly as Aaron says if you try to be both a producer to the beginning of a pipeline, and a consumer of the end of the pipeline, it is easy to get into a deadlock situation. This is the problem that communicate() solves.

我想你可能正在研究错误的问题。当然,正如Aaron所说,如果你试图同时成为管道的生产者,同时也是管道末端的消费者,那么很容易陷入僵局。这就是通信()解决的问题。

communicate() isn't exactly correct for you since stdin and stdout are on different subprocess objects; but if you take a look at the implementation in subprocess.py you'll see that it does exactly what Aaron suggested.

由于stdin和stdout在不同的子进程对象上,所以通信()并不完全正确。但是,如果您查看子流程中的实现。你会发现它和Aaron的建议完全一致。

Once you see that communicate both reads and writes, you'll see that in your second try communicate() competes with p2 for the output of p1:

一旦您看到了通信的读写,您将看到在您的第二次尝试通信()与p2的输出竞争p1:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

I am running on win32, which definitely has different i/o and buffering characteristics, but this works for me:

我在win32上运行,它确实有不同的I /o和缓冲特性,但是这对我来说很有用:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()

I tuned the input size to produce a deadlock when using a naive unthreaded p2.read()

我调整了输入的大小,以在使用幼稚的无线程p2.read()时产生死锁

You might also try buffering into a file, eg

您还可以尝试缓冲到一个文件中。

fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()

That also works for me without deadlocks.

这对我也适用,没有死锁。

#8


1  

In one of the comments above, I challenged nosklo to either post some code to back up his assertions about select.select or to upvote my responses he had previously down-voted. He responded with the following code:

在上面的一个评论中,我向nosklo提出了挑战,要么发布一些代码来支持他关于select的断言。请选择或支持我的回答,他之前投票赞成。他的答复如下:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written 

One problem with this script is that it second-guesses the size/nature of the system pipe buffers. The script would experience fewer failures if it could remove magic numbers like 1024.

这个脚本的一个问题是,它对系统管道缓冲区的大小/本质进行了二次猜测。如果该脚本能够删除1024这样的神奇数字,那么它将经历更少的失败。

The big problem is that this script code only works consistently with the right combination of data input and external programs. grep and cut both work with lines, and so their internal buffers behave a bit differently. If we use a more generic command like "cat", and write smaller bits of data into the pipe, the fatal race condition will pop up more often:

最大的问题是,该脚本代码只适用于数据输入和外部程序的正确组合。grep和cut都使用行,所以它们的内部缓冲区的表现有点不同。如果我们使用更通用的命令,如“cat”,并将更小的数据写入管道中,致命的竞争条件将会更频繁地出现:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written 

In this case, two different results will manifest:

在这种情况下,会出现两个不同的结果:

write, write, close file, read -> success
write, read -> hang

So again, I challenge nosklo to either post code showing the use of select.select to handle arbitrary input and pipe buffering from a single thread, or to upvote my responses.

因此,我再次向nosklo提出了一些代码,以显示select的用法。选择处理来自单个线程的任意输入和管道缓冲,或者对我的响应进行向上投票。

Bottom line: don't try to manipulate both ends of a pipe from a single thread. It's just not worth it. See pipeline for a nice low-level example of how to do this correctly.

底线:不要试图从单个线程控制管道的两端。这是不值得的。请参阅管道,了解如何正确地执行该操作的低级示例。

#9


0  

What about using a SpooledTemporaryFile ? This bypasses (but perhaps doesn't solve) the issue:

使用一个临时文件怎么样?这条旁路(但也许没有解决)问题:

http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile

http://docs.python.org/library/tempfile.html tempfile.SpooledTemporaryFile

You can write to it like a file, but it's actually a memory block.

你可以像一个文件一样写它,但它实际上是一个内存块。

Or am I totally misunderstanding...

或者我完全误解了……

#10


-1  

Here's an example of using Popen together with os.fork to accomplish the same thing. Instead of using close_fds it just closes the pipes at the right places. Much simpler than trying to use select.select, and takes full advantage of system pipe buffers.

这里有一个使用Popen和操作系统的例子。用叉来完成同样的事情。与其使用close_fds,它只在正确的地方关闭管道。比使用select更简单。选择,并充分利用系统管道缓冲区。

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()

#11


-1  

It's much simpler than you think!

它比你想象的要简单得多!

import sys
from subprocess import Popen, PIPE

# Pipe the command here. It will read from stdin.
#   So cat a file, to stdin, like (cat myfile | ./this.py),
#     or type on terminal and hit control+d when done, etc
#   No need to handle this yourself, that's why we have shell's!
p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE)

nextData = None
while True:
    nextData = p.stdout.read()
    if nextData in (b'', ''):
        break
    sys.stdout.write ( nextData.decode('utf-8') )


p.wait()

This code is written for python 3.6, and works with python 2.7.

这段代码是为python 3.6编写的,与python 2.7兼容。

Use it like:

使用它:

cat README.md  | python ./example.py

or

python example.py 

To pipe the contents of "README.md" to this program.

给“自述”的内容管。医学博士”这个项目。

But.. at this point, why not just use "cat" directly, and pipe the output like you want? like:

但. .在这一点上,为什么不直接使用“cat”,并像您希望的那样使用管道输出呢?如:

cat filename | grep -v not | cut -c 1-10

typed into the console will do the job as well. I personally would only use the code option if I was further processing the output, otherwise a shell script would be easier to maintain and be retained.

输入到控制台也可以完成任务。如果我进一步处理输出,我个人只会使用代码选项,否则shell脚本将更容易维护和保留。

You just, use the shell to do the piping for you. In one, out the other. That's what she'll are GREAT at doing, managing processes, and managing single-width chains of input and output. Some would call it a shell's best non-interactive feature..

你只要用壳来为你做管道。其中一个,另一个。这就是她所擅长的,管理流程,管理输入和输出的单宽度链。有些人把它称为shell最好的非交互式特性。


推荐阅读
author-avatar
奶爸乳酪
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有